use std::collections::HashMap;

use log::error;
use postgres::{Config, NoTls};
use r2d2::{Pool, PooledConnection};
use r2d2_postgres::PostgresConnectionManager;
use serde_json::{json, Value};
use strfmt::strfmt;
use time::PrimitiveDateTime;

use super::{IDatabase, TABLE_TASKS};
use crate::datasource::record::{IndexeaTask, Record};
use crate::datasource::{DataSource, IDataSource};
use crate::task::Task;

pub struct Postgres {
    pub pool: Pool<PostgresConnectionManager<NoTls>>,
}

impl IDataSource for Postgres {
    fn new(ds: &DataSource) -> Self {
        let cfg: Config = ds.url.as_str().parse().unwrap(); //"host=localhost user=postgres".parse().unwrap();
        let manager = PostgresConnectionManager::new(cfg, NoTls);
        // 创建连接池配置
        let pool = Pool::builder().max_size(5).build(manager).unwrap();
        Postgres { pool }
    }

    fn init(&self, tasks: &HashMap<&String, &Task>) -> Result<(), String> {
        self.init_db(tasks)
    }

    fn ping(&self) -> Result<bool, String> {
        self.conn().query("SELECT 1", &[]).unwrap();
        Ok(true)
    }

    fn records(
        &self,
        task: &Task,
        limit: u32,
        offset: u32,
        scroll_id: u64,
    ) -> Result<Vec<Value>, String> {
        let sql = Postgres::json_sql(&self.list_records_sql(task, limit, offset, scroll_id));
        let mut records = Vec::new();
        match self.conn().query(sql.as_str(), &[]) {
            Ok(rows) => {
                //read columns
                for row in rows {
                    if let Ok(json) = row.try_get::<usize, Value>(0) {
                        if let Some(items) = json.as_array() {
                            records = items.clone();
                        }
                    }
                }
            }
            Err(e) => return Err(e.to_string()),
        }
        Ok(records)
    }

    fn tasks(&self, name: &String, task: &Task, count: i32) -> Result<Vec<Record>, String> {
        let sql = "SELECT * FROM indexea_tasks WHERE task = $1 AND status = 0 ORDER BY id LIMIT $2";
        let mut conn = self.conn();
        match conn.query(sql, &[name, &(count as i64)]) {
            Ok(tasks) => {
                let mut records = Vec::new();
                for row in tasks {
                    let t_record = IndexeaTask {
                        id: row.get::<_, i32>("id") as i64,
                        task: row.get::<_, String>("task"),
                        field: row.get::<_, String>("field"),
                        value: row.get::<_, String>("value"),
                        ftype: row.get::<_, i16>("ftype") as i8,
                        ops: row.get::<_, i16>("ops") as i8,
                        status: row.get::<_, i16>("status") as i8,
                        created: row.get::<_, PrimitiveDateTime>("created"),
                        updated: row.get::<_, Option<PrimitiveDateTime>>("updated"),
                    };
                    if t_record.ops == 3 {
                        // delete record cannot get row record, composite value from task
                        let task_clone = t_record.clone();
                        let mut hm = serde_json::Map::new();
                        hm.insert(t_record.field, json!(t_record.value));
                        records.push(Record {
                            value: serde_json::Value::Object(hm),
                            task: task_clone,
                            index: String::new(),
                        });
                    } else {
                        let sql = Postgres::json_sql(&self.get_select_sql(task, &t_record));
                        match self.conn().query(sql.as_str(), &[]) {
                            Ok(rows) => {
                                //read columns
                                for row in rows {
                                    if let Ok(json) = row.try_get::<usize, Value>(0) {
                                        if let Some(items) = json.as_array() {
                                            records.push(Record {
                                                value: items[0].clone(),
                                                task: t_record.clone(),
                                                index: String::new(),
                                            });
                                        } else {
                                            records.push(Record {
                                                value: serde_json::Value::Null,
                                                task: t_record.clone(),
                                                index: String::new(),
                                            });
                                        }
                                    }
                                }
                            }
                            Err(e) => return Err(e.to_string()),
                        }
                    }
                }
                Ok(records)
            }
            Err(e) => Err(e.to_string()),
        }
    }

    fn finish(&self, records: &Vec<Record>) -> Result<(), String> {
        let mut conn = self.conn();
        let stmt = conn
            .prepare(
                "UPDATE indexea_tasks SET status = $1, updated = CURRENT_TIMESTAMP WHERE id = $2",
            )
            .unwrap();
        let mut err = false;
        for rec in records {
            if let Err(e) = conn.execute(&stmt, &[&(rec.task.status as i16), &(rec.task.id as i32)])
            {
                err = true;
                error!(
                    "update task [id={}] status({}) failed, reason: {:?}",
                    rec.task.id, rec.task.status, e
                )
            }
        }
        if err {
            return Err(String::from("update tasks status failed, See the log for details"));
        }
        Ok(())
    }

    fn clean(&self, tasks: &HashMap<&String, &Task>) -> Result<(), String> {
        self.clean_db(tasks)
    }
}

impl IDatabase for Postgres {
    /// check indexea tasks table exists
    fn tasks_table_exists(&self) -> Result<bool, String> {
        let sql = format!("SELECT COUNT(id) FROM {}", TABLE_TASKS);
        match self.conn().query_one(sql.as_str(), &[]) {
            Ok(_row) => Ok(true),
            Err(_) => Ok(false),
        }
    }

    fn create_tasks_table(&self) -> Result<(), String> {
        let sql = r#"
                        CREATE TABLE IF NOT EXISTS indexea_tasks (
                          id SERIAL PRIMARY KEY,
                          task VARCHAR(32) NOT NULL,
                          field VARCHAR(32) NOT NULL,
                          value VARCHAR(64) NOT NULL,
                          ftype INT2 NOT NULL,
                          ops INT2 NOT NULL,
                          status INT2 NOT NULL DEFAULT 0,
                          created TIMESTAMP WITHOUT TIME ZONE DEFAULT CURRENT_TIMESTAMP,
                          updated TIMESTAMP 
                        );
                        CREATE INDEX idx_indexea_tasks ON indexea_tasks (task, status) WITH (deduplicate_items = off);
                    "#;
        if let Err(e) = self.conn().batch_execute(sql) {
            return Err(e.to_string());
        }
        Ok(())
    }

    fn drop_tasks_table(&self) -> Result<(), String> {
        let sql = "DROP TABLE IF EXISTS indexea_tasks CASCADE";
        if let Err(e) = self.conn().batch_execute(sql) {
            return Err(e.to_string());
        }
        Ok(())
    }

    fn create_triggers(&self, name: &String, task: &Task) -> Result<(), String> {
        let mut conn = self.conn();

        //check primary field type
        let field_type = 1.to_string();

        let mut vars = HashMap::new();
        vars.insert("task".to_string(), name.as_str());
        vars.insert("table".to_string(), task.table.as_ref().unwrap().as_str());
        vars.insert("field".to_string(), task.primary.as_ref().unwrap().as_str());
        vars.insert("type".to_string(), field_type.as_str());

        let sql = r#"
        DROP TRIGGER IF EXISTS indexea_after_insert_for_{task} ON {table};
        CREATE OR REPLACE FUNCTION indexea_insert_to_tasks_{task}()
        RETURNS TRIGGER AS $$
        BEGIN
            INSERT INTO indexea_tasks(task,field,value,ftype,ops) VALUES('{task}', '{field}', NEW.{field}, {type}, 1);
            RETURN NULL;
        END; $$ LANGUAGE 'plpgsql';
        CREATE TRIGGER indexea_after_insert_for_{task}
            AFTER INSERT
            ON {table} FOR EACH ROW EXECUTE PROCEDURE indexea_insert_to_tasks_{task}();
        "#;
        let sql = strfmt(&sql, &vars).unwrap();
        if let Err(e) = conn.batch_execute(sql.as_str()) {
            error!("{:?}", e);
            return Err(e.to_string());
        }

        let sql = r#"
        DROP TRIGGER IF EXISTS indexea_after_update_for_{task} ON {table};
        CREATE OR REPLACE FUNCTION indexea_update_to_tasks_{task}()
        RETURNS TRIGGER AS $$
        BEGIN
            INSERT INTO indexea_tasks(task,field,value,ftype,ops) VALUES('{task}', '{field}', OLD.{field}, {type}, 2);
            RETURN NULL;
        END; $$ LANGUAGE 'plpgsql';
        CREATE TRIGGER indexea_after_update_for_{task}
            AFTER UPDATE
            ON {table} FOR EACH ROW EXECUTE PROCEDURE indexea_update_to_tasks_{task}();
        "#;
        let sql = strfmt(&sql, &vars).unwrap();
        if let Err(e) = conn.batch_execute(sql.as_str()) {
            error!("{:?}", e);
            return Err(e.to_string());
        }

        let sql = r#"
        DROP TRIGGER IF EXISTS indexea_after_delete_for_{task} ON {table};
        CREATE OR REPLACE FUNCTION indexea_delete_to_tasks_{task}()
        RETURNS TRIGGER AS $$
        BEGIN
            INSERT INTO indexea_tasks(task,field,value,ftype,ops) VALUES('{task}', '{field}', OLD.{field}, {type}, 3);
            RETURN NULL;
        END; $$ LANGUAGE 'plpgsql';
        CREATE TRIGGER indexea_after_delete_for_{task}
            AFTER DELETE
            ON {table} FOR EACH ROW EXECUTE PROCEDURE indexea_delete_to_tasks_{task}();
        "#;
        let sql = strfmt(&sql, &vars).unwrap();
        if let Err(e) = conn.batch_execute(sql.as_str()) {
            error!("{:?}", e);
            return Err(e.to_string());
        }

        Ok(())
    }

    fn drop_triggers(&self, name: &String, _task: &Task) -> Result<(), String> {
        let sql = r#"
        DROP FUNCTION IF EXISTS indexea_insert_to_tasks_{task} CASCADE;
        DROP FUNCTION IF EXISTS indexea_update_to_tasks_{task} CASCADE;
        DROP FUNCTION IF EXISTS indexea_delete_to_tasks_{task} CASCADE;
        "#;
        let mut vars = HashMap::new();
        vars.insert("task".to_string(), name.as_str());
        let sql = strfmt(&sql, &vars).unwrap();
        if let Err(e) = self.conn().batch_execute(sql.as_str()) {
            error!("{:?}", e);
            return Err(e.to_string());
        }
        Ok(())
    }
}

impl Postgres {
    fn conn(&self) -> PooledConnection<PostgresConnectionManager<NoTls>> {
        self.pool.get().unwrap()
    }

    ///
    /// use postgres function to aggregate result set to json
    fn json_sql(sql: &String) -> String {
        format!("SELECT JSON_AGG(s) FROM ({}) s", sql)
    }
}
